-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Broker] Fix broker dispatch byte rate limiter. #11135
[Broker] Fix broker dispatch byte rate limiter. #11135
Conversation
@@ -276,8 +278,10 @@ public synchronized void readMoreEntries() { | |||
} | |||
} | |||
|
|||
protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) { | |||
// left pair is messagesToRead, right pair is bytesToRead | |||
protected MutablePair<Integer, Integer> calculateToRead(int currentTotalAvailablePermits) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need MutablePair here? we'd better do not allow to modify it outside?
@@ -349,7 +352,7 @@ protected void readMoreEntries(Consumer consumer) { | |||
} | |||
} | |||
|
|||
protected int calculateNumOfMessageToRead(Consumer consumer) { | |||
protected MutablePair<Integer, Integer> calculateToRead(Consumer consumer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to move to AbstractBaseDispatcher
? Looks many duplicated code with the PersistentDispatcherMultipleConsumers.
for (int i = 0; i < numProducedMessages; i++) { | ||
producer.send(new byte[byteRate / 10]); | ||
producer.send(new byte[99]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use asynchronous API? If we use synchronous API, this means for each message we need to ensure the publish latency is lower than 50ms, otherwise it should be a flaky test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And what's the reason change to new byte[99]
} | ||
|
||
latch.await(); | ||
Assert.assertEquals(totalReceived.get(), numProducedMessages); | ||
Awaitility.await().atLeast(3, TimeUnit.SECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need atLeast
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to check when the time less than 3 second consumer con't receive messages more than 6.
@lhotari PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use ImmutablePair ?
@@ -234,6 +238,19 @@ public void resetCloseFuture() { | |||
// noop | |||
} | |||
|
|||
protected static Pair<Integer, Integer> calculateToRead(int messagesToRead, int availablePermitsOnMsg, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a better name like computeReadLimits
?
Can parameters be ordered as: messagesToRead, availableMsgPermits, bytesToRead, availableBytePermits
just to have a consistent grouping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion.
} | ||
|
||
if (availablePermitsOnByte > 0) { | ||
bytesToRead = availablePermitsOnByte; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dispatcherMaxReadSizeBytes
was used to limit max number of bytes that can be read from bookies in one cursor read op. With this change if user configures a dispatch rate of say 100MB/s, it will mean that bytesToRead can be 100MB (since we do not do Math.min here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
} | ||
Pair<Integer, Integer> calculateResult = calculateToRead(messagesToRead, | ||
(int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(), | ||
(int) topicRateLimiter.getAvailableDispatchRateLimitOnByte(), bytesToRead); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do type conversion for bytes from long to int? Could cause bugs when user sets dispatch rate higher than 2GB (unlikely but possible)
@@ -263,8 +264,7 @@ public synchronized void readMoreEntries() { | |||
consumerList.size()); | |||
} | |||
havePendingRead = true; | |||
cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), | |||
this, | |||
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bytesToRead can potentially be -1 now (like at line 315 when new MutablePair<>(-1, -1);
is returned) when dispatch limit is not set. This changes the default behaviour because earlier it was capped by serviceConfig.getDispatcherMaxReadSizeBytes()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in line 230, when bytesToRead or messageToRead is -1, it will not process read operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank for you review, I left some question and change the code. please review again. thanks.
@@ -234,6 +238,19 @@ public void resetCloseFuture() { | |||
// noop | |||
} | |||
|
|||
protected static Pair<Integer, Integer> calculateToRead(int messagesToRead, int availablePermitsOnMsg, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion.
} | ||
|
||
if (availablePermitsOnByte > 0) { | ||
bytesToRead = availablePermitsOnByte; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
@@ -263,8 +264,7 @@ public synchronized void readMoreEntries() { | |||
consumerList.size()); | |||
} | |||
havePendingRead = true; | |||
cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), | |||
this, | |||
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in line 230, when bytesToRead or messageToRead is -1, it will not process read operation.
changed |
Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead, | ||
(int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(), | ||
topicRateLimiter.getAvailableDispatchRateLimitOnByte(), bytesToRead); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have not changed the order of arguments here (after changing the order in the function definition). It should be
computeReadLimits(
messagesToRead,
(int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
bytesToRead,
topicRateLimiter.getAvailableDispatchRateLimitOnByte());
Please fix at other places too where this function is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, could you please review again? if don't have any other comment, please approve this pr then we will merge it :)
@anvinjain Do you want to take a final look? |
## Motivation fix #11044 now dispatcher byte rate limit don't limit every cursor read. When cursor read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It will cause that dispatcher read entries by `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time. (cherry picked from commit ce6be12)
The PR #11135 couldn't be cherry-picked to branch-2.7, because there are too many conflicts. ## Motivation fix #11044 now dispatcher byte rate limit don't limit every cursor read. When cursor read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It will cause that dispatcher read entries by `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time. ## implement when cursor read entries size need to calculate, the calculate result by dispatcher bytes limiter.
## Motivation fix apache#11044 now dispatcher byte rate limit don't limit every cursor read. When cursor read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It will cause that dispatcher read entries by `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time.
Motivation
fix #11044
now dispatcher byte rate limit don't limit every cursor read. When cursor read always use
ServiceConfiguration.dispatcherMaxReadSizeBytes
to read. It will cause that dispatcher read entries byServiceConfiguration.dispatcherMaxReadSizeBytes
to read every time.implement
when cursor read entries size need to calculate, the calculate result by dispatcher bytes limiter.
Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)